iT邦幫忙

2025 iThome 鐵人賽

DAY 22
0
Build on AWS

動漫宅的 30 天 AWS Lakehouse 修行日誌系列 第 22

Day22 淬鍊之章-多檔案上傳 ETL 流程-實作篇2

  • 分享至 

  • xImage
  •  

簡介

在上篇 Day21 淬鍊之章-多檔案上傳 ETL 流程-實作篇1 中,我們設計了一個 EventBridge 定期排程觸發 Lambda 並呼叫 Glue Workflow,今天要實際驗證這個排程是否如預期運作,並在最後加入 SNS 通知機制,讓系統能自動回報「成功或失敗」的執行狀態。


驗證排程運作狀況

在驗證 Lambda 是否成功運作之前,我們先來簡單的介紹一下 LambdaCloudWatch 之間的關係。

🔹 Amazon CloudWatch

  • 用途:集中收集各種 AWS 服務的 Logs 與 Metrics。
  • 特性
    • 自動接收 Lambda 的執行輸出(print()logger.info() 等)
    • 可建立 Dashboard 監控錯誤率與延遲
    • 可透過 Alarm + SNS 實現自動通知

🔗 整合方式

每個 Lambda 函數在建立時,AWS 會自動建立對應的 Log Group,例如:

/aws/lambda/check_files_and_trigger_workflow

每次 Lambda 被觸發時,都會自動產生一條 Log Stream,記錄:

  • 觸發時間
  • 輸出訊息(printlogger
  • 錯誤堆疊(若程式失敗)

👉 因此,我們可以透過 CloudWatch 直接觀察 Lambda 是否被 EventBridge 成功觸發、
執行結果如何,以及是否正確呼叫 Glue Workflow。

  • 根據下圖我們可以看到該 Lambda 的執行狀況,已有成功檢查到
    ✅ Found: Bronze/ratings/2025-10-06/ratings.csv
    ✅ Found: Bronze/ratings/2025-10-06/ratings.csv

  • 並且有實際去呼叫一個 Glue Workflow:Started Glue Workflow: wr_aa1c928fc88c3fba1f69651f69df83f5ec4998e4f3ddfdfa8f3bb5d45ba55f31

https://ithelp.ithome.com.tw/upload/images/20251006/20163443VfVuuHbUg0.png

Glue ETL Monitor 驗證

  • 當 Lambda 去處發 Glue Workflow 並回傳執行完畢的作業,我們就可以來查看 Glue 的 Monitor 是否有成功執行。
  • 我們可以根據下圖確認到 wf_animes_summary 串接的相關 Glue Job 都有成功被執行。

https://ithelp.ithome.com.tw/upload/images/20251006/20163443w4Q5kXEY5r.png

經以上驗證後,即可確認到排程是有正常被啟動,並呼叫目標 Lambda。
在測試階段時,其實是可以將 EventBridge 調整成每 1-5 分鐘執行一次,加快驗證速度。

Lambda 執行結果通知機制(整合 Amazon SNS)

在完成 EventBridge + Lambda 自動排程後,我們希望能在 Lambda 執行成功或失敗時,立即收到通知信件。這時候,就能使用 AWS 的 SNS(Simple Notification Service)

什麼是 Amazon SNS?

Amazon SNS 是一個「發佈/訂閱式(Pub/Sub)」的訊息服務,可讓多個應用程式透過主題(Topic)傳遞訊息給不同接收者(Email、SMS、Lambda、HTTP 等)。

在這個場景中,我們會讓:Lambda 成功或失敗時 → 呼叫 SNS → SNS 發送 Email 給你。

建立 SNS Topic

接下來我們來實際建立 SNS。

Step1:一樣在使用任何服務之前,我們都要先建立此服務的 Policy,由於我們也是需要使用 Lambda 來觸發 SNS 服務,所以首先我們要先將 SNS 的 Policy 指派給 Full_Lambda_Role 角色。

https://ithelp.ithome.com.tw/upload/images/20251006/20163443qy9jmUNu7T.png

Step2:接著我們一樣是要透過使用者 Andy 來建立 SNS Topic 所以我們要先把 SNS Policy 也指派給 DE Group 使用。
https://ithelp.ithome.com.tw/upload/images/20251006/20163443sC2ZlrCdnE.png

Step3:再來透過服務搜尋處找到 SNS 服務,並進入服務頁面
https://ithelp.ithome.com.tw/upload/images/20251006/20163443sfQLofkNnu.png

https://ithelp.ithome.com.tw/upload/images/20251006/20163443GYyLbrCcd8.png

Step4:接著我們來建立 Topic
https://ithelp.ithome.com.tw/upload/images/20251006/201634434RuTNj6am9.png

Step5:Topic 設定

  • Type:Standard
  • Nameanime-etl-alert
  • 其餘皆選擇預設即可點選「建立主題」

https://ithelp.ithome.com.tw/upload/images/20251006/20163443TSnj5denO1.png

https://ithelp.ithome.com.tw/upload/images/20251006/20163443rm5zttSXQW.png

以上流程即完成 Topic 的建立,接下來我們要來建立 Subscription。

建立訂閱 (Subscription)

Step1:進入訂閱頁面

https://ithelp.ithome.com.tw/upload/images/20251006/20163443OWe8PBhVAR.png

Step2:Subscription 設定

  • 主題 ARN:選擇剛剛建立的 Topic 即可
  • 通訊協定:電子郵件
  • 端點:請輸入你想接收到 SNS 通知的 Email
  • 其餘選項可以預設即可,完成後直接點選右下角「建立」

https://ithelp.ithome.com.tw/upload/images/20251006/20163443il1PoHoj6W.png

https://ithelp.ithome.com.tw/upload/images/20251006/20163443379CVyNsiJ.png

Step3:到剛剛設定的訂閱者 Email 查看有無收到驗證信,如沒問題就點選 「Confirm subscription」

https://ithelp.ithome.com.tw/upload/images/20251006/20163443SuJriseNMI.png

Step4:點選後就會跳轉到成功訂閱的通知頁面

https://ithelp.ithome.com.tw/upload/images/20251006/20163443RNUywNi9po.png

以上即完成 Subscription 的建立,接著我們要來修改 Lambda 讓他可以觸發 SNS 來通知訂閱者 Email。

新增 SNS 呼叫至 Lambda Function

Step1:修改 Lambda:check_files_and_trigger_workflow

  • 新增 sns.client 與 SNS_TOPIC_ARN (注意要調整為自己的 account_id)
  • 新增其餘 SNS 所需語法

https://ithelp.ithome.com.tw/upload/images/20251006/20163443suntjr6xnP.png

Python 語法:

import boto3
import datetime
import json

s3 = boto3.client("s3")
glue = boto3.client("glue")
sns = boto3.client("sns")

BUCKET_NAME = "anime-lake"
WORKFLOW_NAME = "wf_animes_summary"
SNS_TOPIC_ARN = "arn:aws:sns:ap-east-2:<account_id>:anime-etl-alert"

def lambda_handler(event, context):
    today = datetime.date.today().strftime("%Y-%m-%d")
    print(f"🔍 Checking files for date: {today}")

    expected_files = [
        f"Bronze/animes/{today}/animes.csv",
        f"Bronze/ratings/{today}/ratings.csv"
    ]

    missing_files = []

    # 🔎 檢查 S3 檔案存在與否
    for key in expected_files:
        try:
            s3.head_object(Bucket=BUCKET_NAME, Key=key)
            print(f"✅ Found: {key}")
        except s3.exceptions.ClientError:
            print(f"❌ Missing: {key}")
            missing_files.append(key)

    # ✅ 若所有檔案皆存在 → 觸發 Glue Workflow
    if not missing_files:
        try:
            response = glue.start_workflow_run(Name=WORKFLOW_NAME)
            run_id = response["RunId"]
            print(f"🚀 Started Glue Workflow: {run_id}")

            # SNS 成功通知
            message = {
                "status": "success",
                "date": today,
                "workflow_name": WORKFLOW_NAME,
                "workflow_run_id": run_id,
                "message": "All files found, Glue Workflow triggered successfully."
            }

            sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Subject=f"✅ Glue Workflow Triggered for {today}",
                Message=json.dumps(message, indent=2)
            )
            print("📨 SNS success notification sent.")

            return {"statusCode": 200, "body": f"Workflow started for {today}"}

        except Exception as e:
            # SNS 失敗通知(執行 Glue Workflow 發生例外)
            error_message = {
                "status": "failed",
                "date": today,
                "workflow_name": WORKFLOW_NAME,
                "error": str(e)
            }

            sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Subject=f"❌ Glue Workflow Failed for {today}",
                Message=json.dumps(error_message, indent=2)
            )
            print(f"🛑 Error starting Glue Workflow: {e}")

            return {"statusCode": 500, "body": f"Error: {str(e)}"}

    # ⚠️ 若有缺失檔案 → 跳過並通知 SNS
    else:
        warning_message = {
            "status": "skipped",
            "date": today,
            "missing_files": missing_files,
            "message": "Some files are missing. Workflow not triggered."
        }

        sns.publish(
            TopicArn=SNS_TOPIC_ARN,
            Subject=f"⚠️ Missing Files Detected for {today}",
            Message=json.dumps(warning_message, indent=2)
        )
        print("⚠️ SNS warning notification sent (missing files).")

        return {"statusCode": 200, "body": f"Missing files: {', '.join(missing_files)}"}

Step2:接著使用 Lambda 的測試功能來做測試,看看訂閱者 Email 是否有正常收到通知訊息

https://ithelp.ithome.com.tw/upload/images/20251006/20163443OuWh8twgkM.png

Step3:成功觸發 Lambda 後,我們可以看到 Email 有正常收到 AWS 通知 Glue ETL 的執行

https://ithelp.ithome.com.tw/upload/images/20251006/20163443rzOYGwobNb.png

  • 若想修改通知內容,可以找到上方 Lambda 語法內的這段自行調整
# SNS 成功通知
message = {
  "status": "success",
  "date": today,
  "workflow_name": WORKFLOW_NAME,
  "workflow_run_id": run_id,
  "message": "All files found, Glue Workflow triggered successfully."
         }

Step4:當改寫 Lambda 後,也需要再去確認一下後續的流程有無正常執行,所以我們來查看一下 Glue Monitor,看起來是有正常運作的。

https://ithelp.ithome.com.tw/upload/images/20251006/20163443A7PCJfWG5p.png

Step5:接著我們來看看失敗的通知會是什麼內容,我們可以將 S3 上當天的資料檔案刪除,在重新執行一次 Lambda 測試

https://ithelp.ithome.com.tw/upload/images/20251006/20163443rfTIE0s0DW.png

  • 可以看到 Email 有正常收到失敗通知,並提供缺漏檔案導致 Error 的問題,以及未觸發後續 Glue Workflow

https://ithelp.ithome.com.tw/upload/images/20251006/20163443hqoP5tdHvI.png

以上即為 Lambda 結合 SNS 可以做到的通知 Lambda 成功或失敗的效果,是不是非常便利壓?


結論與建議

透過本篇的實作,我們已經讓整個 EventBridge → Lambda → Glue Workflow 的排程自動化流程更加完整。
並藉由 Amazon SNS 的整合,讓系統具備「自我回報」的能力:

  • 自動通知成功狀態:當檔案齊全並成功啟動 Glue Workflow,即時寄信通知。
  • ⚠️ 檔案缺失提醒:若 S3 缺少必要資料,可立即收到警示信件,避免空跑。
  • 錯誤通知:任何觸發錯誤或 Glue Workflow 啟動失敗都能即時回報。

這樣的設計不僅符合 生產環境的監控需求
也能大幅減少手動檢查、重跑的成本。

💡 建議

  • 可以進一步將 SNS 結合 CloudWatch Alarm,在 Lambda 發生錯誤時自動通知。
  • 若團隊使用 Slack,可再串接 AWS Chatbot,讓通知直接發送至 Slack 頻道。
  • 若 Workflow 複雜,可考慮在 SNS 訊息中附加更多 metadata(例如檔案大小、ETL 時間等)。

整體來說,本篇的設計已經達到「可觀測、可維運、可自動回報」的目標,這也是邁向 生產級 Lakehouse Pipeline 的重要一步。


下篇預告

在下一篇 「Day23 淬鍊之章-資料補跑機制 Backfill 實作篇」 中,我們將延伸今天的內容,讓整個 Pipeline 具備:

  • 🔁 歷史資料回補能力:針對特定日期區間自動補跑 ETL。
  • 🗂️ 資料狀態追蹤 (Tracking):記錄每次補跑的執行結果。

這將讓你的 Data Lakehouse 流程更具彈性與穩定性。


📚 參考資料


上一篇
Day21 淬鍊之章-多檔案上傳 ETL 流程-實作篇1
下一篇
Day23 淬鍊之章-資料補跑機制 Backfill 實作篇
系列文
動漫宅的 30 天 AWS Lakehouse 修行日誌23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言